package com.process;

import canal.bean.RowData;
import com.alibaba.fastjson.JSON;
import com.base.MySqlBaseETL;
import com.bean.*;
import com.utils.KafkaUtil;
import com.utils.RedisUtil;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import redis.clients.jedis.Jedis;

/**
 * @Description: 商品数据的实时ETL操作
 * @Author: Sky
 * @Times : 2021/8/15 12:33
 */
public class GoodsDataETL {

    public static void main(String[] args) throws Exception {

        GoodsDataETL goodsDataETL = new GoodsDataETL();

        goodsDataETL.process();

    }
    /**
     * 根据业务抽取出来process方法，因为所有的ETL都有操作方法
     */
    public void process() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        MySqlBaseETL mysqlEtl = new MySqlBaseETL();
        //1.只过滤出来 foo_goods 表的日志，并进行转换
        DataStream<RowData> dwdGoods = mysqlEtl.KafkaConsummer("ods_shop", env);
        DataStream<RowData> goodsCanalDS = dwdGoods.filter(new FilterFunction<RowData>() {
            @Override
            public boolean filter(RowData canalRowData) throws Exception {
                return canalRowData.getTableName().equals("foo_goods");
            }
        }).filter(new FilterFunction<RowData>() {
            @Override
            public boolean filter(RowData rowData) throws Exception {
                return rowData.getColumns().size()>3;
            }
        });

        goodsCanalDS.print();
//        2.使用同步IO方式请求Redis拉取维度数据.
        DataStream<GoodsWideEntity> goodsEntityDataStream = goodsCanalDS.map(new RichMapFunction<RowData, GoodsWideEntity>() {
            private Jedis jedis;

            @Override
            public void open(Configuration parameters) throws Exception {
                jedis = RedisUtil.getJedis().getResource();
                jedis.select(1);
            }

            @Override
            public void close() throws Exception {
                if (jedis.isConnected()) {
                    jedis.close();
                }
            }

            @Override
            public GoodsWideEntity map(RowData rowData) throws Exception {
                String shopJSON = jedis.hget("foo_shop:dim_shops", rowData.getColumns().get("shopId"));
                DimShopsDBEntity dimShop = DimShopsDBEntity.getDimShops(shopJSON);
                String thirdCatJSON = jedis.hget("foo_shop:dim_goods_cats", rowData.getColumns().get("goodsCatId"));
                DimGoodsCatDBEntity dimThirdCat = DimGoodsCatDBEntity.getDimGoodCat(thirdCatJSON);
                System.out.println(dimThirdCat);
                String secondCatJSON = jedis.hget("foo_shop:dim_goods_cats", dimThirdCat.getParentId());
                DimGoodsCatDBEntity dimSecondCat = DimGoodsCatDBEntity.getDimGoodCat(secondCatJSON);
                System.out.println(dimSecondCat);
                String firstCatJSON = jedis.hget("foo_shop:dim_goods_cats", dimSecondCat.getParentId());
                DimGoodsCatDBEntity dimFirstCat = DimGoodsCatDBEntity.getDimGoodCat(firstCatJSON);

                //通过cancal获取表中商品(Goods)纬度
                String secondShopCatJson = jedis.hget("foo_shop:dim_shop_cats", rowData.getColumns().get("shopCatId1"));
                DimShopCatDBEntity dimSecondShopCat = DimShopCatDBEntity.getDimShop(secondShopCatJson);
                String firstShopCatJson = jedis.hget("foo_shop:dim_shop_cats", rowData.getColumns().get("shopCatId2"));
                DimShopCatDBEntity dimFirstShopCat = DimShopCatDBEntity.getDimShop(firstShopCatJson);

                //获取省市区的内容
                String cityJSON = jedis.hget("foo_shop:dim_org", dimShop.getAreaId() + "");
                DimOrgDBEntity dimOrgCity = DimOrgDBEntity.getDimOrg(cityJSON);

                String regionJSON = jedis.hget("foo_shop:dim_org", dimOrgCity.getParentId() + "");
                DimOrgDBEntity dimOrgRegion = DimOrgDBEntity.getDimOrg(regionJSON);

                GoodsWideEntity goodsWideEntity = new GoodsWideEntity();
                goodsWideEntity.setGoodsId(Long.valueOf(rowData.getColumns().get("goodsId")));
                goodsWideEntity.setGoodsSn(rowData.getColumns().get("goodsSn"));
                goodsWideEntity.setProductNo(rowData.getColumns().get("productNo"));
                goodsWideEntity.setGoodsName(rowData.getColumns().get("goodsName"));
                goodsWideEntity.setGoodsImg(rowData.getColumns().get("goodsImg"));
                goodsWideEntity.setShopId(rowData.getColumns().get("shopId"));
                goodsWideEntity.setShopName(dimShop.getShopName());
                goodsWideEntity.setGoodsType(rowData.getColumns().get("goodsType"));
                goodsWideEntity.setMarketPrice(rowData.getColumns().get("marketPrice"));
                goodsWideEntity.setShopPrice(rowData.getColumns().get("shopPrice"));
                goodsWideEntity.setWarnStock(rowData.getColumns().get("goodsStock"));
                goodsWideEntity.setGoodsUnit(rowData.getColumns().get("goodsUnit"));
                goodsWideEntity.setGoodsTips(rowData.getColumns().get("goodsTips"));
                goodsWideEntity.setIsSale(rowData.getColumns().get("isSale"));
                goodsWideEntity.setIsBest(rowData.getColumns().get("isBest"));
                goodsWideEntity.setIsHot(rowData.getColumns().get("isHot"));
                goodsWideEntity.setIsNew(rowData.getColumns().get("isNew"));
                goodsWideEntity.setIsRecom(rowData.getColumns().get("isRecom"));
                goodsWideEntity.setGoodsCatIdPath(rowData.getColumns().get("goodsCatIdPath"));
                //三级分类
                goodsWideEntity.setCatId(dimThirdCat.getCatId());
                goodsWideEntity.setCatName(dimThirdCat.getCatName());
                //二级分类
                goodsWideEntity.setCatId(dimSecondCat.getCatId());
                goodsWideEntity.setCatName(dimSecondCat.getCatName());
                //一级分类
                goodsWideEntity.setCatId(dimFirstCat.getCatId());
                goodsWideEntity.setCatName(dimFirstCat.getCatName());


                goodsWideEntity.setBrandId(rowData.getColumns().get("brandId"));
                goodsWideEntity.setGoodsDesc(rowData.getColumns().get("goodsDesc"));
                goodsWideEntity.setGoodsStatus(rowData.getColumns().get("goodsStatus"));
                goodsWideEntity.setSaleNum(rowData.getColumns().get("saleNum"));
                goodsWideEntity.setSaleTime(rowData.getColumns().get("saleTime"));
                goodsWideEntity.setVisitNum(rowData.getColumns().get("visitNum"));
                goodsWideEntity.setAppraiseNum(rowData.getColumns().get("appraiseNum"));
                goodsWideEntity.setIsSpec(rowData.getColumns().get("isSpec"));
                goodsWideEntity.setGallery(rowData.getColumns().get("gallery"));
                goodsWideEntity.setGoodsSeoKeywords(rowData.getColumns().get("goodsSeoKeywords"));
                goodsWideEntity.setIllegalRemarks(rowData.getColumns().get("illegalRemarks"));
                goodsWideEntity.setDataFlag(rowData.getColumns().get("dataFlag"));
                goodsWideEntity.setCreateTime(rowData.getColumns().get("createTime"));
                goodsWideEntity.setIsFreeShipping(rowData.getColumns().get("isFreeShipping"));
                goodsWideEntity.setGoodsSerachKeywords(rowData.getColumns().get("goodsSerachKeywords"));
                goodsWideEntity.setModifyTime(rowData.getColumns().get("modifyTime"));
                goodsWideEntity.setCityId(dimOrgCity.getOrgId());
                goodsWideEntity.setCityName(dimOrgCity.getOrgName());
                goodsWideEntity.setRegionId(dimOrgRegion.getOrgId());
                goodsWideEntity.setRegionName(dimOrgRegion.getOrgName());
                return goodsWideEntity;
            }
        });
        //转化为字串将结果存入Kafka
        DataStream<String> goodsWideJsonDataStream = goodsEntityDataStream.map(new MapFunction<GoodsWideEntity, String>() {
            @Override
            public String map(GoodsWideEntity goodsWideEntity) throws Exception {
                return JSON.toJSONString(goodsWideEntity);
            }
        });

        //将数据保存在Kafka
        goodsWideJsonDataStream.addSink(new RichSinkFunction<String>() {
            @Override
            public void invoke(String value, Context context) throws Exception {
                KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(KafkaUtil.kafkaCons);
                ProducerRecord<String, String> logs = new ProducerRecord<>("dwd_goods", null, value);
                kafkaProducer.send(logs);
            }
        });

        env.execute();

    }
}
